Doris数据源

DataWorks数据集成支持使用Doris Writer导入表数据至Doris。本文为您介绍DataWorks的Doris数据同步能力支持情况。

支持的Doris版本

Doris Writer使用的驱动版本是MySQL Driver 5.1.47,该驱动支持的内核版本如下。驱动能力详情请参见Doris官网文档

Doris 版本

是否支持

0.x.x

支持

1.1.x

支持

1.2.x

支持

2.x

支持

使用限制

数据集成仅支持离线写入Doris。

支持的字段类型

不同Doris版本支持不同的数据类型和聚合模型。各版本Doris的全量字段类型请参见Doris的官方文档,下面为您介绍Doris当前主要字段的支持情况。

类型

支持模型

Doris版本

离线写入(Doris Writer)

SMALLINT

Aggregate,Unique,Duplicate

0.x.x、1.1.x、1.2.x、2.x

支持

INT

Aggregate,Unique,Duplicate

0.x.x、1.1.x、1.2.x、2.x

支持

BIGINT

Aggregate,Unique,Duplicate

0.x.x、1.1.x、1.2.x、2.x

支持

LARGEINT

Aggregate,Unique,Duplicate

0.x.x、1.1.x、1.2.x、2.x

支持

FLOAT

Aggregate,Unique,Duplicate

0.x.x、1.1.x、1.2.x、2.x

支持

DOUBLE

Aggregate,Unique,Duplicate

0.x.x、1.1.x、1.2.x、2.x

支持

DECIMAL

Aggregate,Unique,Duplicate

0.x.x、1.1.x、1.2.x、2.x

支持

DECIMALV3

Aggregate,Unique,Duplicate

1.2.1+、2.x

支持

DATE

Aggregate,Unique,Duplicate

0.x.x、1.1.x、1.2.x、2.x

支持

DATETIME

Aggregate,Unique,Duplicate

0.x.x、1.1.x、1.2.x、2.x

支持

DATEV2

Aggregate,Unique,Duplicate

1.2.x、2.x

支持

DATATIMEV2

Aggregate,Unique,Duplicate

1.2.x、2.x

支持

CHAR

Aggregate,Unique,Duplicate

0.x.x、1.1.x、1.2.x、2.x

支持

VARCHAR

Aggregate,Unique,Duplicate

0.x.x、1.1.x、1.2.x、2.x

支持

STRING

Aggregate,Unique,Duplicate

0.x.x、1.1.x、1.2.x、2.x

支持

VARCHAR

Aggregate,Unique,Duplicate

1.1.x、1.2.x、2.x

支持

ARRAY

Duplicate

1.2.x、2.x

支持

JSONB

Aggregate,Unique,Duplicate

1.2.x、2.x

支持

HLL

Aggregate

0.x.x、1.1.x、1.2.x、2.x

支持

BITMAP

Aggregate

0.x.x、1.1.x、1.2.x、2.x

支持

QUANTILE_STATE

Aggregate

1.2.x、2.x

支持

实现原理

Doris Writer通过Doris原生支持的StreamLoad方式导入数据,Doris Writer会将Reader端读取到的数据缓存在内存中,并拼接成文本,然后批量导入至Doris数据库。更多详情请参见Doris官方文档

数据同步前准备

在DataWorks上进行数据同步前,您需要参考本文提前在Doris侧进行数据同步环境准备,以便在DataWorks上进行Doris数据同步任务配置与执行时服务正常。以下为您介绍Doris同步前的相关环境准备。

准备工作1:确认Doris的版本

数据集成对Doris版本有要求,您可以参考上文支持的Doris版本章节,查看当前待同步的Doris是否符合版本要求。您可以在Doris的官方网站下载对应的版本,并且安装。

准备工作2:创建账号,并配置账号权限

您需要规划一个数据仓库的登录账号用于后续操作,同时,您需要为该账号设置密码,以便后续连接到数据仓库。如果您要使用Doris默认的root用户进行登录,那么您需要为root用户设置密码。root用户默认没有密码,您可以在Doris上执行SQL来设置密码:

SET PASSWORD FOR 'root' = PASSWORD('密码')

准备工作3:配置Doris的网络连接

使用StreamLoad导入数据,需要访问FE节点的私网地址。如果访问FE的公网地址,则会被重定向到BE节点的内网IP(数据操作问题)。因此,您需要将Doris的网络与数据集成使用的Serverless资源组或独享数据集成资源组打通,使之通过内网地址进行访问。网络打通的具体操作可以参考网络连通方案

创建数据源

在进行数据同步任务开发时,您需要在DataWorks上创建一个对应的数据源,操作流程请参见创建并管理数据源详细的配置参数解释可在配置界面查看对应参数的文案提示

下面对Doris数据源的几个配置项进行说明:

  • JdbcUrl:请填写JDBC连接串,包含IP、端口号、数据库和连接参数。支持公网IP和私网IP,如果使用公网IP,请确保数据集成资源组能够正常访问Doris所在的主机。

  • FE endpoint:请填写FE节点的IP和端口。如果您的集群中有多个FE节点,可以配置多个FE节点的IP和端口,每个IP和端口以逗号分隔,例如ip1:port1,ip2:port2。在测试连通性时,会对所有的FE endpoint做连通性测试。

  • 用户名:请填写Doris数据库的用户名。

  • 密码:请填写Doris数据库对应用户的密码。

数据同步任务开发

数据同步任务的配置入口和通用配置流程可参见下文的配置指导。

单表离线同步任务配置指导

附录:脚本Demo与参数说明

离线任务脚本配置方式

如果您配置离线任务时使用脚本模式的方式进行配置,您需要按照统一的脚本格式要求,在任务脚本中编写相应的参数,详情请参见通过脚本模式配置离线同步任务,以下为您介绍脚本模式下数据源的参数配置详情。

Writer脚本Demo

{
  "stepType": "doris",//插件名。
  "parameter":
  {
    "postSql"://执行数据同步任务之后率先执行的SQL语句。
    [],
    "preSql":
    [],//执行数据同步任务之前率先执行的SQL语句。
    "datasource":"doris_datasource",//数据源名。
    "table": "doris_table_name",//表名。
    "column":
    [
      "id",
      "table_id",
      "table_no",
      "table_name",
      "table_status"
    ],
    "loadProps":{
      "column_separator": "\\x01",//指定CSV格式的列分隔符
      "line_delimiter": "\\x02"//指定CSV格式的行分隔符
    }
  },
  "name": "Writer",
  "category": "writer"
}

Writer脚本参数

参数

描述

是否必选

默认值

datasource

数据源名称,脚本模式支持添加数据源,此配置项填写的内容必须与添加的数据源名称保持一致。

table

选取的需要同步的表名称。

column

目标表需要写入数据的字段,字段之间用英文逗号分隔。例如"column":["id","name","age"]。如果要依次写入全部列,使用(*)表示,例如"column":["*"]

preSql

执行数据同步任务之前率先执行的SQL语句。目前向导模式仅允许执行一条SQL语句,脚本模式可以支持多条SQL语句,例如,执行前清空表中的旧数据。

postSql

执行数据同步任务之后执行的SQL语句。目前向导模式仅允许执行一条SQL语句,脚本模式可以支持多条SQL语句,例如,加上某一个时间戳。

maxBatchRows

每批次导入数据的最大行数。和batchSize共同控制每批次的导入数量。每批次数据达到两个阈值之一,即开始导入这一批次的数据。

500000

batchSize

每批次导入数据的最大数据量。和maxBatchRows共同控制每批次的导入数量。每批次数据达到两个阈值之一,即开始导入这一批次的数据。

104857600

maxRetries

每批次导入数据失败后的重试次数。

3

labelPrefix

每批次上传文件的 label 前缀。最终的label将有labelPrefix + UUID组成全局唯一的label,确保数据不会重复导入。

datax_doris_writer_

loadProps

StreamLoad的请求参数,主要用于配置导入的数据格式。默认以CSV格式导入。如果loadProps没有配置,则采用默认的CSV格式,以\t为列分隔符,\n为行分隔符,配置如下所示。

"loadProps": {
    "format":"csv",
    "column_separator": "\t",
    "line_delimiter": "\n"
}

如果您需要指定为JSON格式导入,则配置如下所示。

"loadProps": {
    "format": "json"
}

聚合类型脚本(Doris Writer写入聚合类型)

Doris Writer支持写入聚合类型,但是需要在脚本模式中做额外的配置。如下所示。

例如,对于如下一张Doris的表,其中,uuidbitmap类型(聚合类型)、sexHLL类型(聚合类型)。

CREATE TABLE `example_table_1` (
  `user_id` int(11) NULL,
  `date` varchar(10) NULL DEFAULT "10.5",
  `city` varchar(10) NULL,
  `uuid` bitmap BITMAP_UNION NULL, -- 聚合类型
  `sex` HLL HLL_UNION  -- 聚合类型
) ENGINE=OLAP AGGREGATE KEY(`user_id`, `date`,`city`)
COMMENT 'OLAP' DISTRIBUTED BY HASH(`user_id`) BUCKETS 32

往表中插入原始数据:

user_id,date,city,uuid,sex
0,T0S4Pb,abc,43,'54'
1,T0S4Pd,fsd,34,'54'
2,T0S4Pb,fa3,53,'64'
4,T0S4Pb,fwe,87,'64'
5,T0S4Pb,gbr,90,'56'
2,iY3GiHkLF,234,100,'54'

通过Doris Writer写入聚合类型时,不仅需要在writer.parameter.column中指定该列,还需要在writer.parameter.loadProps.columns中配置聚合函数。例如,为uuid使用聚合函数bitmap_hash,为sex使用聚合函数hll_hash

脚本模式示例:

{
    "stepType": "doris",//插件名。
    "writer":
    {
        "parameter":
        {
            "column":
            [
                "user_id",
                "date",
                "city",
                "uuid",// 聚合类型bitmap
                "sex"// 聚合类型HLL
            ],
            "loadProps":
            {
                "format": "csv",
                "column_separator": "\\x01",
                "line_delimiter": "\\x02",
                "columns": "user_id,date,city,k1,uuid=bitmap_hash(k1),k2,sex=hll_hash(k2)"// 需要指定聚合函数
            },
            "postSql":
            [
                "select count(1) from example_tbl_3"
            ],
            "preSql":
            [],
            "datasource":"doris_datasource",//数据源名。
                    "table": "doris_table_name",//表名。
        }
          "name": "Writer",
              "category": "writer"
    }
}